-
Notifications
You must be signed in to change notification settings - Fork 664
[Feat] Add Ray Cron Job #4159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Feat] Add Ray Cron Job #4159
Conversation
| type RayCronJobSpec struct { | ||
| // JobTemplate defines the job spec that will be created by cron scheduling | ||
| JobTemplate *RayJobSpec `json:"jobTemplate"` | ||
| // Schedule is the cron schedule string | ||
| Schedule string `json:"schedule"` | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is alright, but I think we should introduce a separate struct (e.g. RayJobTemplateSpec) to hold both the metadata and the spec for the generated RayJob, similar to how Kubernetes models JobTemplateSpec in CronJob.
This would allow users to specify metadata inside jobTemplate, which we can then propagate to the created RayJob. It also keeps the API aligned with common Kubernetes patterns.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we want this. In RayJobSpec when we specify the RayClusterSpec, we only set RayClusterSpec itself rather than also letting user set the ObjectMeta.
| RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"` |
cc @rueian for confirmation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for it at the moment, this can be a future work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current structure looks good to me already.
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
| rayVersion: '2.46.0' # should match the Ray version in the image of the containers | ||
| # Ray head pod template |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use 2.52.0 version? thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 6e3c37a. Thank you!
Signed-off-by: machichima <nary12321@gmail.com>
| // This is the only 2 places where we update the RayCronJob status. This will directly | ||
| // update the ScheduleStatus to ValidationFailed if there's validation error | ||
| if err = r.updateRayCronJobStatus(ctx, originalRayCronJobInstance, rayCronJobInstance); err != nil { | ||
| logger.Info("Failed to update RayCronJob status", "error", err) | ||
| return ctrl.Result{RequeueAfter: RayCronJobDefaultRequeueDuration}, err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to update status for validation failed since we not use ScheduleStatus for RayCronJob.
| // Create RayCronJob with invalid cron schedule | ||
| rayCronJob := &rayv1.RayCronJob{ | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "invalid-cronjob", | ||
| Namespace: "default", | ||
| }, | ||
| Spec: rayv1.RayCronJobSpec{ | ||
| Schedule: "invalid cron string", | ||
| JobTemplate: &rayv1.RayJobSpec{ | ||
| Entrypoint: "python test.py", | ||
| RayClusterSpec: &rayv1.RayClusterSpec{ | ||
| HeadGroupSpec: rayv1.HeadGroupSpec{ | ||
| Template: corev1.PodTemplateSpec{ | ||
| Spec: corev1.PodSpec{ | ||
| Containers: []corev1.Container{ | ||
| { | ||
| Name: "ray-head", | ||
| Image: "rayproject/ray:2.9.0", | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create a template function for the ut? like this:
| func rayClusterTemplate(name string, namespace string) *rayv1.RayCluster { |
win5923
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| if rayCronJobInstance.Status.LastScheduleTime == nil { | ||
| // The new RayCronJob, not yet scheduled | ||
| rayCronJobInstance.Status.LastScheduleTime = &metav1.Time{Time: now} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we only update the LastScheduleTime without creating the rayjob in the first reconciliation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first reconciliation is just to prepare the schedule. We don't create the job yet because we need to check if the scheduled time has actually arrived (e.g., if the job runs every minute but it's currently 15:10:30, we shouldn't create it yet)
Therefore, the first reconcile just sets the current time as LastScheduleTime and sets the requeue delay to future schedule time - now."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question, is this how kuberentes job api works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question, is this how kuberentes job api works?
Not really, Kubernetes CronJob uses the CreationTimestamp.Time to compute the next schedule time when evaluating it for the first time:
func mostRecentScheduleTime(cj *batchv1.CronJob, now time.Time, ...) {
earliestTime := cj.ObjectMeta.CreationTimestamp.Time
if cj.Status.LastScheduleTime != nil {
earliestTime = cj.Status.LastScheduleTime.Time
}
// ...
}The CronJob also use two schedule times (t1 and t2) calculate the next time to handle missed schedules, if the current time falls between t1 and t2, the controller will create a Job.
t1 := schedule.Next(earliestTime)
t2 := schedule.Next(t1)
if now.Before(t1) {
return earliestTime, nil, missedSchedules, nil
}
if now.Before(t2) {
return earliestTime, &t1, missedSchedules, nil
} sequenceDiagram
participant R as Reconciler
participant M as mostRecentScheduleTime()
participant K as K8s API
Note over R: CronJob created at 10:05<br/>Schedule: "0 * * * *"
R->>M: First reconcile (10:05)
M->>M: earliestTime = CreationTimestamp (10:05)
M->>M: t1 = schedule.Next(10:05) = 11:00
M->>M: now (10:05) < t1 (11:00) ?
M-->>R: Return nil (not time yet)
Note over R: LastScheduleTime stays nil<br/>Requeue until 11:00
R->>M: Second reconcile (11:00)
M->>M: earliestTime = CreationTimestamp (10:05)
M->>M: t1 = 11:00, t2 = 12:00
M->>M: now (11:00) >= t1 && now < t2 ?
M-->>R: Return &t1 (11:00) ✅
R->>K: CreateJob(scheduledTime: 11:00)
K-->>R: Job created ✅
R->>K: Update Status.LastScheduleTime = 11:00
Note over R: First Job created!
And the LastScheduleTime is only updated after creating a Job.
scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
....
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updateStatus = trueIf i was wrong, please correct me.
|
|
||
| if rayCronJobInstance.Status.LastScheduleTime == nil { | ||
| // The new RayCronJob, not yet scheduled | ||
| rayCronJobInstance.Status.LastScheduleTime = &metav1.Time{Time: now} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question, is this how kuberentes job api works?
| func (r *RayCronJobReconciler) updateRayCronJobStatus(ctx context.Context, oldRayCronJob *rayv1.RayCronJob, newRayCronJob *rayv1.RayCronJob) error { | ||
| logger := ctrl.LoggerFrom(ctx) | ||
| oldRayCronJobStatus := oldRayCronJob.Status | ||
| newRayCronJobStatus := newRayCronJob.Status | ||
| if oldRayCronJobStatus.LastScheduleTime != newRayCronJobStatus.LastScheduleTime { | ||
|
|
||
| logger.Info("updateRayCronJobStatus", "old RayCronJobStatus", oldRayCronJobStatus, "new RayCronJobStatus", newRayCronJobStatus) | ||
| if err := r.Status().Update(ctx, newRayCronJob); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this include the first time we initialize LastScheduleTime?

Why are these changes needed?
Support cron job scheduling. Following this design docs and implement milestone 1 in this PR
Main changes:
RayCronJobCRD and controllerRayCronJobTest
Apply the sample YAML
ray-operator/config/samples/ray-cronjob.sample.yaml. RayJobs are being scheduled every minute:Trigger validation error
Related issue number
Following comment: #2426 (comment)
Closes #2426
Checks